package com.funo.oeip.regcenter.zkcli;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * zookeeper原生api调用
 * @author simonfj
 *
 */
public class ZkClientDemo implements Watcher {
	public static Logger LOG = LoggerFactory.getLogger(ZkClientDemo.class);
	/** ZooKeeper */
	public ZooKeeper zk;

	/**
	 * 
	 */
	public static CountDownLatch connectedSemaphore = new CountDownLatch(1);
	/**
	 * 节点名称
	 */
	public static final String ZNODE1 = "/znode001";
	public static final String ZNODE11 = "/znode001/znode011";
	public static final String ZNODE12 = "/znode001/znode012";	
	/**
	 * 连接串
	 */
	//public static final String CONNECTION_STR = "172.16.16.193:2181";
	/**
	 * 集群方式
	 */
	public static final String CONNECTION_STR = "172.16.16.193:2181,172.16.16.216:2181,172.16.16.212:2181";
	public static final int SESSION_TIMEOUT = 5000;

	public static void main(String[] args) {
		ZkClientDemo zkClientDemo = new ZkClientDemo();
		zkClientDemo.connectionZookeeper(CONNECTION_STR, SESSION_TIMEOUT, zkClientDemo);
		boolean isExists = zkClientDemo.isExists(ZNODE1,true);
		if (isExists) {
			System.out.println("节点已存在:" + ZNODE1);
		} else {
			zkClientDemo.createPath(ZNODE1, "znode001-value");
			System.out.println("创建节点成功:" + ZNODE1);
		}
		zkClientDemo.isExists(ZNODE1,true);
		zkClientDemo.writeData(ZNODE1, "777777");
		String value = zkClientDemo.readData(ZNODE1);
		System.out.println("节点znode1值:" + value);
		zkClientDemo.isExists(ZNODE1,true);
		zkClientDemo.writeData(ZNODE1, "888888");
		value = zkClientDemo.readData(ZNODE1);
		System.out.println("变更后，节点znode1值:" + value);
		//创建子节点begin
//		zkClientDemo.createPath(ZNODE11, "c777777");		
//		zkClientDemo.createPath(ZNODE12, "c777777");
//		List<String> childList = zkClientDemo.getChild(ZNODE1);
//		for(int i=0;i<childList.size();i++) {
//			System.out.println(childList.get(i));
//		}
		//创建子节点end
//		zkClientDemo.isExists(ZNODE1,true);
//		zkClientDemo.deletePath(ZNODE1);
		
		try {
			connectedSemaphore.await();
			zkClientDemo.releaseConnection();
		} catch (InterruptedException e) {

			e.printStackTrace();
		}

	}

	/**
	 * <p>
	 * 连接Zookeeper
	 * </p>
	 * 
	 * <pre>
	 *     [关于connectString服务器地址配置]
	 *     格式: 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
	 *     这个地址配置有多个ip:port之间逗号分隔,底层操作
	 *     ConnectStringParser connectStringParser =  new ConnectStringParser(“192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181”);
	 *     这个类主要就是解析传入地址列表字符串，将其它保存在一个ArrayList中
	 *     ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
	 *     接下去，这个地址列表会被进一步封装成StaticHostProvider对象，并且在运行过程中，一直是这个对象来维护整个地址列表。
	 *     ZK客户端将所有Server保存在一个List中，然后随机打乱(这个随机过程是一次性的)，并且形成一个环，具体使用的时候，从0号位开始一个一个使用。
	 *     因此，Server地址能够重复配置，这样能够弥补客户端无法设置Server权重的缺陷，但是也会加大风险。
	 *
	 *     [客户端和服务端会话说明]
	 *     ZooKeeper中，客户端和服务端建立连接后，会话随之建立，生成一个全局唯一的会话ID(Session ID)。
	 *     服务器和客户端之间维持的是一个长连接，在SESSION_TIMEOUT时间内，服务器会确定客户端是否正常连接(客户端会定时向服务器发送heart_beat，服务器重置下次SESSION_TIMEOUT时间)。
	 *     因此，在正常情况下，Session一直有效，并且ZK集群所有机器上都保存这个Session信息。
	 *     在出现网络或其它问题情况下（例如客户端所连接的那台ZK机器挂了，或是其它原因的网络闪断）,客户端与当前连接的那台服务器之间连接断了,
	 *     这个时候客户端会主动在地址列表（实例化ZK对象的时候传入构造方法的那个参数connectString）中选择新的地址进行连接。
	 *
	 *     [会话时间]
	 *     客户端并不是可以随意设置这个会话超时时间，在ZK服务器端对会话超时时间是有限制的，主要是minSessionTimeout和maxSessionTimeout这两个参数设置的。
	 *     如果客户端设置的超时时间不在这个范围，那么会被强制设置为最大或最小时间。 默认的Session超时时间是在2 * tickTime ~ 20 * tickTime
	 * </pre>
	 * 
	 * @param connectString
	 *            Zookeeper服务地址
	 * @param sessionTimeout
	 *            Zookeeper连接超时时间
	 *            
	 * @param watcher
	 * 				watcher
	 */
	public void connectionZookeeper(String connectString, int sessionTimeout, Watcher watcher) {
		this.releaseConnection();
		try {
			// ZK客户端允许我们将ZK服务器的所有地址都配置在这里
			zk = new ZooKeeper(connectString, sessionTimeout, watcher);
		} catch (IOException e) {
			LOG.error("连接创建失败，发生 IOException , e " + e.getMessage(), e);
		}
	}

	/**
	 * <p>
	 * 创建zNode节点, String create(path<节点路径>, data[]<节点内容>, List(ACL访问控制列表),
	 * CreateMode<zNode创建类型>)
	 * </p>
	 * <br/>
	 * 
	 * <pre>
	 *     节点创建类型(CreateMode)
	 *     1、PERSISTENT:持久化节点
	 *     2、PERSISTENT_SEQUENTIAL:顺序自动编号持久化节点，这种节点会根据当前已存在的节点数自动加 1
	 *     3、EPHEMERAL:临时节点客户端,session超时这类节点就会被自动删除
	 *     4、EPHEMERAL_SEQUENTIAL:临时自动编号节点
	 * </pre>
	 * 
	 * @param path
	 *            zNode节点路径
	 * @param data
	 *            zNode数据内容
	 * @return 创建成功返回true, 反之返回false.
	 */
	public boolean createPath(String path, String data) {
		try {
			String zkPath = zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			LOG.info("节点创建成功, Path: " + zkPath + ", content: " + data);
			return true;
		} catch (KeeperException e) {
			LOG.error("节点创建失败, 发生KeeperException! path: " + path + ", data:" + data + ", errMsg:" + e.getMessage(), e);
		} catch (InterruptedException e) {
			LOG.error(
					"节点创建失败, 发生 InterruptedException! path: " + path + ", data:" + data + ", errMsg:" + e.getMessage(),
					e);
		}
		return false;
	}

	/**
	 * <p>
	 * 删除一个zMode节点, void delete(path<节点路径>, stat<数据版本号>)
	 * </p>
	 * <br/>
	 * 
	 * <pre>
	 *     说明
	 *     1、版本号不一致,无法进行数据删除操作.
	 *     2、如果版本号与znode的版本号不一致,将无法删除,是一种乐观加锁机制;如果将版本号设置为-1,不会去检测版本,直接删除.
	 * </pre>
	 * 
	 * @param path
	 *            zNode节点路径
	 * @return 删除成功返回true,反之返回false.
	 */
	public boolean deletePath(String path) {
		try {
			zk.delete(path, -1);
			LOG.info("节点删除成功, Path: " + path);
			return true;
		} catch (KeeperException e) {
			LOG.error("节点删除失败, 发生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e);
		} catch (InterruptedException e) {
			LOG.error("节点删除失败, 发生 InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e);
		}
		return false;
	}

	/**
	 * <p>
	 * 更新指定节点数据内容, Stat setData(path<节点路径>, data[]<节点内容>, stat<数据版本号>)
	 * </p>
	 * 
	 * <pre>
	 *     设置某个znode上的数据时如果为-1，跳过版本检查
	 * </pre>
	 * 
	 * @param path
	 *            zNode节点路径
	 * @param data
	 *            zNode数据内容
	 * @return 更新成功返回true,返回返回false
	 */
	public boolean writeData(String path, String data) {
		try {
			Stat stat = this.zk.setData(path, data.getBytes(), -1);
			LOG.info("更新数据成功, path：" + path + ", stat: " + stat);
			return true;
		} catch (KeeperException e) {
			LOG.error("更新数据失败, 发生KeeperException! path: " + path + ", data:" + data + ", errMsg:" + e.getMessage(), e);
		} catch (InterruptedException e) {
			LOG.error("更新数据失败, 发生InterruptedException! path: " + path + ", data:" + data + ", errMsg:" + e.getMessage(),
					e);
		}
		return false;
	}

	/**
	 * <p>
	 * 读取指定节点数据内容,byte[] getData(path<节点路径>, watcher<监视器>, stat<数据版本号>)
	 * </p>
	 * 
	 * @param path
	 *            zNode节点路径
	 * @return 节点存储的值,有值返回,无值返回null
	 */
	public String readData(String path) {
		String data = null;
		try {
			data = new String(this.zk.getData(path, false, null));
			LOG.info("读取数据成功, path:" + path + ", content:" + data);
		} catch (KeeperException e) {
			LOG.error("读取数据失败,发生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e);
		} catch (InterruptedException e) {
			LOG.error("读取数据失败,发生InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e);
		}
		return data;
	}

	/**
	 * <p>
	 * 判断某个zNode节点是否存在, Stat exists(path<节点路径>, watch<并设置是否监控这个目录节点，这里的 watcher 是在创建
	 * ZooKeeper 实例时指定的 watcher>)
	 * </p>
	 * 
	 * @param path
	 *            zNode节点路径
	 * @return 存在返回true,反之返回false
	 */
	public boolean isExists(String path) {
		try {
			Stat stat = this.zk.exists(path, false);
			return null != stat;
		} catch (KeeperException e) {
			LOG.error("读取数据失败,发生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e);
		} catch (InterruptedException e) {
			LOG.error("读取数据失败,发生InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e);
		}
		return false;
	}
	
	/**
	 * <p>
	 * 判断某个zNode节点是否存在, Stat exists(path<节点路径>, watch<并设置是否监控这个目录节点，这里的 watcher 是在创建
	 * ZooKeeper 实例时指定的 watcher>)
	 * </p>
	 * 
	 * @param path
	 *            zNode节点路径
	 * @return 存在返回true,反之返回false
	 */
	public boolean isExists(String path, boolean needWatch) {
		try {
			Stat stat = this.zk.exists(path, needWatch);
			return null != stat;
		} catch (KeeperException e) {
			LOG.error("读取数据失败,发生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e);
		} catch (InterruptedException e) {
			LOG.error("读取数据失败,发生InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e);
		}
		return false;
	}	

	/**
	 * <p>
	 * 获取某个节点下的所有子节点,List getChildren(path<节点路径>, watcher<监视器>)该方法有多个重载
	 * </p>
	 * 
	 * @param path
	 *            zNode节点路径
	 * @return 子节点路径集合 说明,这里返回的值为节点名
	 * 
	 *         <pre>
	 *     eg.
	 *     /node
	 *     /node/child1
	 *     /node/child2
	 *     getChild( "node" )户的集合中的值为["child1","child2"]
	 *         </pre>
	 *
	 * @throws KeeperException
	 * @throws InterruptedException
	 */
	public List<String> getChild(String path) {
		try {
			List<String> list = this.zk.getChildren(path, false);
			if (list.isEmpty()) {
				LOG.info("中没有节点" + path);
			}
			return list;
		} catch (KeeperException e) {
			LOG.error("读取子节点数据失败,发生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e);
		} catch (InterruptedException e) {
			LOG.error("读取子节点数据失败,发生InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e);
		}
		return null;
	}

	/**
	 * 关闭ZK连接
	 */
	public void releaseConnection() {
		if (null != zk) {
			try {
				this.zk.close();
			} catch (InterruptedException e) {
				LOG.error("release connection error ," + e.getMessage(), e);
			}
		}
	}



	/**
	 * 节点时间处理
	 * 
	 * @param eventType
	 * @param nodePath
	 */
	private void nodeEventTypeHandler(EventType eventType, String nodePath) {
		// 没有任何节点，表示创建连接成功(客户端与服务器端创建连接成功后没有任何节点信息)
		if (EventType.None == eventType) {
			LOG.info("EventType--成功链接zookeeper服务器");
			isExists(nodePath, true);
			//connectedSemaphore.countDown(); // 通知阻塞的线程可以继续执行
		} else if (EventType.NodeCreated == eventType) { // 当服务器端创建节点的时候触发
			LOG.info("EventType--zookeeper服务端创建新的节点");
			// zookeeper服务端创建一个新的节点后并对其进行监控,创建完后接着对该节点进行监控,没有此代码将不会在监控该节点
			isExists(nodePath, true);
		} else if (EventType.NodeDataChanged == eventType) { // 被监控该节点的数据发生变更的时候触发
			LOG.info("EventType--节点的数据更新");
			// 跟新完后接着对该节点进行监控,没有此代码将不会在监控该节点
			readData(nodePath);
			isExists(nodePath, true);
		} else if (EventType.NodeChildrenChanged == eventType) {			
			// 对应本代码而言只能监控根节点的一级节点变更。如：在根节点直接创建一级节点，
			// 或者删除一级节点时触发。如修改一级节点的数据，不会触发，创建二级节点时也不会触发。
			LOG.info("EventType--子节点发生变更");
			LOG.info("子节点列表：" + this.getChild(nodePath));
			isExists(nodePath, true);
		} else if (EventType.NodeDeleted == eventType) {
			LOG.info("EventType--节点：" + nodePath + "被删除");
			connectedSemaphore.countDown(); // 通知阻塞的线程可以继续执行
		}

	}

	public void process(WatchedEvent event) {
		// 取得连接状态
		KeeperState state = event.getState();
		// 取得事件类型
		EventType eventType = event.getType();
		// 哪一个节点路径发生变更
		String nodePath = event.getPath();

		LOG.info("收到Watcher的通知");
		LOG.info("znode:" + nodePath);
		LOG.info("连接状态：" + state);
		LOG.info("事件类型：" + eventType);

		if (KeeperState.SyncConnected == state) {// 连接成功
			LOG.info("客户端连接zookeeper连接成功--------");
			nodeEventTypeHandler(eventType, nodePath);
		} else if (KeeperState.Disconnected == state) {
			LOG.info("客户端连接zookeeper服务器端失败");
		} else if (KeeperState.Expired == state) {
			LOG.info("客户端与zookeeper服务器端会话失败");
		} else if (KeeperState.AuthFailed == state) {
			LOG.info("权限认证失败");
		}

	}

}
